package fetcher;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

import confs.DataType;
import confs.SingleParameter;
import graybox.ParameterDistrict;
import huristic.HuristicConf;
import confs.DataType.typeProperty;
import others.Global;


import java.util.regex.*;
/*
 * Spark中所有可以配置的性能相关的参数均在这里
 * fields: 所有参数
 * function：初始化并将所有参数都赋予默认值
 * Spark 2.1.0
 * */
public class SparkConfiguration {
	public int f_x=-1;
	public Map<String, SingleParameter> parameterMap=null;//<参数全名><参数配置值>
	//params.put(key, value);，默认的配置参数
	/*
	 * 存储单位:MB
	 * boolean型变量：true，false
	 * 默认值为null ：null
	 * 
	 * */
	
	public SparkConfiguration(){
		parameterMap=new TreeMap<String, SingleParameter>();
		
		//已经通过启发式方法调整完毕的参数
		
		
		parameterMap.put("spark.task.cpus", new SingleParameter("spark.task.cpus","1",DataType.typeProperty.INT,"Number of cores to allocate for each task."));
		parameterMap.put("spark.driver.cores", new SingleParameter("spark.driver.cores","1",DataType.typeProperty.INT,"Number of cores to use for the driver process, only in cluster mode."));
		parameterMap.put("spark.driver.memory", new SingleParameter("spark.driver.memory","1g",DataType.typeProperty.INT,"Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g, 2g). "+
						"Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file."));
		parameterMap.put("spark.executor.cores", new SingleParameter("spark.executor.cores","1",DataType.typeProperty.INT,"The number of cores to use on each executor. In standalone and Mesos coarse-grained modes, setting this parameter allows an application to run multiple executors on the same worker,"+
						" provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker."));//1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes.		
		parameterMap.put("spark.executor.memory", new SingleParameter("spark.executor.memory","1g",DataType.typeProperty.INT,"Amount of memory to use per executor process (e.g. 2g, 8g)."));
		
		//程序并行度在启发式中调整完毕，因为没有默认值，所以也不好直接网上上
		//parameterMap.put("spark.default.parallelism", new SingleParameter("spark.default.parallelism","",DataType.typeProperty.INT));
		//无法调整的参数，因为调整可能会使得任务运行失败
		//parameterMap.put("spark.driver.maxResultSize", new SingleParameter("spark.driver.maxResultSize","1g",DataType.typeProperty.INT));
		
		
		

		//shuffle read&write
		parameterMap.put("spark.reducer.maxSizeInFlight", new SingleParameter("spark.reducer.maxSizeInFlight","48m",DataType.typeProperty.INT,"Maximum "
					+"size of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory."));
		parameterMap.put("spark.shuffle.compress", new SingleParameter("spark.shuffle.compress","TRUE",DataType.typeProperty.BOOLEAN,"Whether to compress map output files. Generally a good idea. Compression will use spark.io.compression.codec.")); //Whether to compress map output files. Generally a good idea. Compression will use spark.io.compression.codec.
		parameterMap.put("spark.shuffle.spill.compress", new SingleParameter("spark.shuffle.spill.compress","TRUE",DataType.typeProperty.BOOLEAN,"	Whether to compress data spilled during shuffles. Compression will use spark.io.compression.codec."));//Whether to compress data spilled during shuffles.
		parameterMap.put("spark.shuffle.file.buffer", new SingleParameter("spark.shuffle.file.buffer","32k",DataType.typeProperty.INT,"Size of the in-memory buffer for each shuffle file output stream. These buffers reduce the number of disk seeks and "+
					"system calls made in creating intermediate shuffle files."));
		
		
		
		//broadcast-没反应
		parameterMap.put("spark.broadcast.compress", new SingleParameter("spark.broadcast.compress","TRUE",DataType.typeProperty.BOOLEAN,"	Whether to compress broadcast variables before sending them. Generally a good idea."));
		parameterMap.put("spark.broadcast.blockSize", new SingleParameter("spark.broadcast.blockSize","4m",DataType.typeProperty.INT,"	Size of each piece of a block for TorrentBroadcastFactory. Too large a value decreases parallelism during broadcast (makes it slower); "+
						"however, if it is too small, BlockManager might take a performance hit."));
		parameterMap.put("spark.rpc.message.maxSize", new SingleParameter("spark.rpc.message.maxSize","128",DataType.typeProperty.INT,"Maximum message size (in MB) to allow in \"control plane\" "+
						"communication; generally only applies to map output size information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size."));//Maximum message size 
							//(in MB) to allow in "control plane" communication; generally only applies to map output size information sent between executors and the driver. Increase this if you are running jobs with many thousands of map and reduce tasks and see messages about the RPC message size
		
		//memroy
		parameterMap.put("spark.memory.fraction", new SingleParameter("spark.memory.fraction","0.6",DataType.typeProperty.DOUBLE,"Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and "+
						"cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended. "+
						"For more detail, including important information about correctly tuning JVM garbage collection when increasing this value, see this description."));
		parameterMap.put("spark.memory.storageFraction", new SingleParameter("spark.memory.storageFraction","0.5",DataType.typeProperty.DOUBLE,"Amount of storage memory immune to eviction, expressed as a fraction of the size"+
						" of the region set aside by s​park.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often. Leaving this at the default value is recommended. For more detail, see this description."));		
		parameterMap.put("spark.rdd.compress", new SingleParameter("spark.rdd.compress","FALSE",DataType.typeProperty.BOOLEAN,"	Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER in Java and Scala or StorageLevel.MEMORY_ONLY in Python)."+
						" Can save substantial space at the cost of some extra CPU time."));
		
		//compress
		parameterMap.put("spark.io.compression.codec", new SingleParameter("spark.io.compression.codec",new String[]{"lz4","lzf","snappy"},DataType.typeProperty.CATEGORY_3,"	The codec used to compress internal data such as RDD partitions, broadcast "+
						"variables and shuffle outputs. By default, Spark provides three codecs: lz4, lzf, and snappy. You can also use fully qualified class names to specify the codec, e.g. org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, and org.apache.spark.io.SnappyCompressionCodec."));
		
		//未解决
		//params.put("spark.storage.memoryMapThreshold", new SingleParameter("spark.storage.memoryMapThreshold","2m",DataType.typeProperty.INT));
		
		
	}
	
	/**
	 *权宜之计，从History Server的页面上解析配置文件 
	 * @param html
	 */
	public void findConfInHTML(String html){
		APIFetcher.APIOutPut(html);
		APIFetcher.APIOutPut("#############从HTML文件中找到如下配置参数#####################");
		Pattern pattern=Pattern.compile("<td>([^<>]*)</td>");
		Matcher matcher=pattern.matcher(html);
		//读取<td>spark.scheduler.mode</td>之间的所有数值
		ArrayList<String> conf_value= new ArrayList<String>();//这个函数中时选项和值是混写的
		while (matcher.find()){
			//括号选出想要用的数据
			conf_value.add(matcher.group(1));
			System.out.println(matcher.group(1));
			
		}
		
		for (String key :parameterMap.keySet()){
			if (conf_value.contains(key)){//需要修改默认配置
				int index = conf_value.indexOf(key);
				APIFetcher.APIOutPut(key+"           "+conf_value.get(index+1));
				// 更改配置类中的值
				parameterMap.get(key).value=parameterMap.get(key).toValue((String) conf_value.get(index+1));
			}
		}

		APIFetcher.APIOutPut("#############################################################");
	}
	
	//打印，仅有视觉效果
	public void printChosenConf() {
		System.out.println("######################  所有在SparkConfiguration中的参数参数配置   ############################");
		Iterator<Entry<String, SingleParameter>> iter = parameterMap.entrySet().iterator();  
		while(iter.hasNext()){
			Map.Entry<String,SingleParameter> entry = (Entry<String, SingleParameter>) iter.next();
			String key = (String) entry.getKey();
			SingleParameter value = (SingleParameter) entry.getValue();
			System.out.println(key+"--------------"+value.toString()); //打印非默认参数
		}
		System.out.println("#########################################################################");
	}

	
	/* 
	 * 将参数返回为一个字符串的形式提交给python,按照Group1+Group2+Group3的形式
	 */
	@Override
	public String toString() {
		String ret = "";
		String[] chosenParameterList=null;
		
		int length1 = Global.CHOSEN_PARAMETERS_GROUP1.length;
		int length2 = Global.CHOSEN_PARAMETERS_GROUP2.length;
		int length3 = Global.CHOSEN_PARAMETERS_GROUP3.length;
		
		chosenParameterList = Arrays.copyOf(Global.CHOSEN_PARAMETERS_GROUP1, length1+length2+length3);//数组扩容并拷贝
	    System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP2, 0, chosenParameterList, length1, length2);
	    System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP3, 0, chosenParameterList, length1+length2, length3);

	    String[] tmp2= Arrays.copyOf(Global.CHOSEN_PARAMETERS_GROUP0,chosenParameterList.length+Global.CHOSEN_PARAMETERS_GROUP0.length);
	    System.arraycopy(chosenParameterList, 0, tmp2, Global.CHOSEN_PARAMETERS_GROUP0.length, chosenParameterList.length);
	    chosenParameterList=tmp2;
	    
		for(String key :chosenParameterList){
			if (key.equals("dummy")){
				ret+=(String.valueOf(-99999)+" ");
				continue;
			}
			SingleParameter tmp= this.parameterMap.get(key);
			 //INT,Memory
			 if(tmp.typeProperty.equals(DataType.typeProperty.INT)||tmp.typeProperty.equals(DataType.typeProperty.BOOLEAN)){
				 ret+=(String.valueOf(tmp.value)+" "); 
			 }
			 //Double
			 else if(tmp.typeProperty.equals(DataType.typeProperty.DOUBLE)){
				 //ret+=(String.valueOf(tmp.doubleValue)+" ");
				 ret+=(String.valueOf(tmp.value)+" ");
			 }
			 //category ,Boolean,
			 else if (tmp.typeProperty.equals(DataType.typeProperty.CATEGORY_2)||tmp.typeProperty.equals(DataType.typeProperty.CATEGORY_3)){
				 ret+=(String.valueOf(tmp.value)+" ");
			}
			 else {								//如果作为分类，那样也没有用，因为要知道所有的取值才行
				 System.err.println(tmp.name);
				 System.err.println("SparkConfiguration没有处理过的调优参数");
				 System.exit(1);
			}
		 }
		ret+="\n";
		
		return ret;
	}
	
	
	//依然对脚本提交所有参数，但是所有不在选中区域的参数均为0
	//目前此函数没有使用，因为其他值不是0其实也不影响效果，因为这个版本的三个预测器是分离的
	public String toString(String groupName) {
		
		String[] chosenParameterList=null;
		switch (groupName) {
		case "group1":
			chosenParameterList=Global.CHOSEN_PARAMETERS_GROUP1;
			break;
		case "group2":
			chosenParameterList=Global.CHOSEN_PARAMETERS_GROUP2;
			break;
		case "group3":
			chosenParameterList=Global.CHOSEN_PARAMETERS_GROUP3;
			break;
		default:
			break;
		}
		
		String ret = "";
		for(int i =0;i< chosenParameterList.length;i++){
			String key= chosenParameterList[i];
			if (key.equals("dummy")) {
				ret+=(String.valueOf(1234567)+" ");
				continue;
			}
			SingleParameter tmp= this.parameterMap.get(key);
			 //INT,Memory
			 if(tmp.typeProperty.equals(DataType.typeProperty.INT)){
				 ret+=(String.valueOf(tmp.value)+" "); 
			 }
			 //Double
			 else if(tmp.typeProperty.equals(DataType.typeProperty.DOUBLE)){
				 //ret+=(String.valueOf(tmp.doubleValue)+" ");
				 ret+=(String.valueOf(tmp.value)+" ");
			 }
			 //category ,Boolean,
			 else if (tmp.typeProperty.equals(DataType.typeProperty.CATEGORY_2)||tmp.typeProperty.equals(DataType.typeProperty.CATEGORY_3)){
				 ret+=(String.valueOf(tmp.value)+" ");
			}
			 else {								//如果作为分类，那样也没有用，因为要知道所有的取值才行
				 System.err.println("没有处理过的调优参数");
				 System.exit(1);
			}
		 }
		ret+="\n";
		
		return ret;
	}

	/**
	 * 这一函数将group1+group2+group3 的 所有参数按照Linux-shell Spark 命令行的规范输出
	 */
	public ArrayList<String> toConfList(ArrayList<String> groupNameList) {
		ArrayList<String> ret = new ArrayList<String>();
		
		String[] chosenParameterList=null;
		
		int length1 = Global.CHOSEN_PARAMETERS_GROUP1.length;
		int length2 = Global.CHOSEN_PARAMETERS_GROUP2.length;
		int length3 = Global.CHOSEN_PARAMETERS_GROUP3.length;
		
		
		chosenParameterList = ParameterDistrict.getChosenParameterList(groupNameList);
	    //System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP2, 0, chosenParameterList, length1, length2);
	    //System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP3, 0, chosenParameterList, length1+length2, length3);

		for(String key :chosenParameterList){
			if (key.equals("dummy")) continue;
			SingleParameter tmp= this.parameterMap.get(key);
			ret.add("\t--conf "+tmp.name+"="+tmp.toString());
		}

		return ret;
	}
	
	
	//将参数变为JSON格式的JSON array
	public String toJsonString() throws JSONException{
		JSONArray jsonArray = new JSONArray();  
		String[] chosenParameterList=null;
		
		int length0 = Global.CHOSEN_PARAMETERS_GROUP0.length;
		int length1 = Global.CHOSEN_PARAMETERS_GROUP1.length;
		int length2 = Global.CHOSEN_PARAMETERS_GROUP2.length;
		int length3 = Global.CHOSEN_PARAMETERS_GROUP3.length;
		
		chosenParameterList = Arrays.copyOf(Global.CHOSEN_PARAMETERS_GROUP0, length0+length1+length2+length3);//数组扩容并拷贝
	    System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP1, 0, chosenParameterList, length0, length1);
	    System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP2, 0, chosenParameterList, length0+length1, length2);
	    System.arraycopy(Global.CHOSEN_PARAMETERS_GROUP3, 0, chosenParameterList, length0+length1+length2, length3);
	    	
	    
	    int pos =0;
		for(String key :chosenParameterList){
			if (key.equals("dummy")) continue;
			SingleParameter tmpParameter= this.parameterMap.get(key);
		    Map<String, Object> map = new HashMap<String, Object>();
		    map.put("name", tmpParameter.name);
		    map.put("tuned_value", tmpParameter.toString());
		    map.put("default_value", tmpParameter.defaultValue);
		    map.put("description", tmpParameter.description); 
		    JSONObject mapJson = new JSONObject(map); // 传入Map类型
		    jsonArray.put(pos,map); pos++;
		}
		return jsonArray.toString();

	}
	
	//将启发式调优的参数放进参数列表中，仅仅是为了返回Json时方便
	public void setHuristicConfs(HuristicConf conf) {
		SingleParameter tmpParameter = this.parameterMap.get("spark.executor.cores");
		tmpParameter.value=conf.executorConfList.get(0).executor_cores;
		
		tmpParameter = this.parameterMap.get("spark.executor.memory");
		tmpParameter.value=conf.executorConfList.get(0).executor_memory;
		
		tmpParameter = this.parameterMap.get("spark.task.cpus");
		tmpParameter.value=conf.executorConfList.get(0).cores_per_task;
						
		tmpParameter = this.parameterMap.get("spark.driver.cores");
		tmpParameter.value=Global.CORE_MASTER/2;
				
		tmpParameter = this.parameterMap.get("spark.driver.memory");
		tmpParameter.value=Global.MEMORY_MASRTER/3;



		

	}
	
	
}
